/*
 * jETeL/CloverETL - Java based ETL application framework.
 * Copyright (c) Javlin, a.s. (info@cloveretl.com)
 *  
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2.1 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General Public
 * License along with this library; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
 */
package org.jetel.component;

import java.io.IOException;
import java.sql.BatchUpdateException;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jetel.connection.jdbc.ConnectionAction;
import org.jetel.connection.jdbc.SQLCloverStatement;
import org.jetel.connection.jdbc.SQLUtil;
import org.jetel.data.DataRecord;
import org.jetel.data.DataRecordFactory;
import org.jetel.data.Defaults;
import org.jetel.data.parser.TextParser;
import org.jetel.data.parser.TextParserFactory;
import org.jetel.database.IConnection;
import org.jetel.database.sql.DBConnection;
import org.jetel.database.sql.JdbcSpecific.OperationType;
import org.jetel.database.sql.QueryType;
import org.jetel.database.sql.SqlConnection;
import org.jetel.exception.AttributeNotFoundException;
import org.jetel.exception.ComponentNotReadyException;
import org.jetel.exception.ConfigurationStatus;
import org.jetel.exception.JetelException;
import org.jetel.exception.XMLConfigurationException;
import org.jetel.graph.IGraphElement;
import org.jetel.graph.InputPort;
import org.jetel.graph.Node;
import org.jetel.graph.OutputPort;
import org.jetel.graph.Result;
import org.jetel.graph.TransformationGraph;
import org.jetel.graph.modelview.MVMetadata;
import org.jetel.graph.modelview.impl.MetadataPropagationResolver;
import org.jetel.metadata.DataFieldMetadata;
import org.jetel.metadata.DataFieldType;
import org.jetel.metadata.DataRecordMetadata;
import org.jetel.metadata.DataRecordParsingType;
import org.jetel.util.AutoFilling;
import org.jetel.util.ExceptionUtils;
import org.jetel.util.ReadableChannelIterator;
import org.jetel.util.SynchronizeUtils;
import org.jetel.util.file.FileUtils;
import org.jetel.util.joinKey.JoinKeyUtils;
import org.jetel.util.property.ComponentXMLAttributes;
import org.jetel.util.property.RefResFlag;
import org.jetel.util.string.StringUtils;
import org.w3c.dom.Element;

/**
 *  <h3>DatabaseOutputTable Component</h3>
 * <!-- This component performs DML operation on specified database table (insert/update/delete). -->
 *
 * <table border="1">
 * <th>Component:</th>
 * <tr><td><h4><i>Name:</i></h4></td>
 * <td>DBOutputTable</td></tr>
 * <tr><td><h4><i>Category:</i></h4></td>
 * <td></td></tr>
 * <tr><td><h4><i>Description:</i></h4></td>
 * <td>This component performs specified DML operation (insert/update/delete) on specified database table.<br>
 *   Parameter placeholder in DML statement is [?] - question mark</td></tr>
 * <tr><td><h4><i>Inputs:</i></h4></td>
 * <td>[0]- input records</td></tr>
 * <tr><td><h4><i>Outputs:</i></h4></td>
 * <td>[0] <i>optional</i> - records rejected by database. If in this metadata there is more fields then in input metadata
 * and last field is of type string, this field is filled by error message<br>
 * 	   [1] <i>optional</i> - autogenerated columns:
 * <ul><li>autogenerated columns for <i>insert</i> statement (supported only for Oracle, MySQL, Db2 and Informix
 *  databases, not supported in batch mode at all)</li>
 *  <li>number of updated records in database and requested input fields for <i>update, delete</i> statement</li></ul>
 *  Key record is generated for <b>each</b> input record. In case that
 *  statement execution fails, requested fields are not filled.</td></tr>
 * <tr><td><h4><i>Comment:</i></h4></td>
 * <td></td></tr>
 * </table>
 *  <br>
 *  <table border="1">
 *  <th>XML attributes:</th>
 *  <tr><td><b>type</b></td><td>"DB_OUTPUT_TABLE"</td></tr>
 *  <tr><td><b>id</b></td><td>component identification</td></tr>
 *  <tr><td><b>dbConnection</b></td><td>id of the Database Connection object to be used to access the database</td>
 *  <tr><td><b>dbTable</b><br><i>optional</i></td><td>name of the DB table to populate data with</td>
 *  <tr><td><b>sqlQuery</b><br><i>optional</i></td><td>allows specification of SQL query/DML statement to be executed against
 *  database. It can consist of more then one query separated by semicolon [;]. Question marks [?] in the query text 
 *  are placeholders which are filled with values from input fields specified in <b>cloverFields</b>
 *  attribute. If you have query in this form, <i>cloverFields</i> must be specified as well - it determines which input fields will
 *  be used/mapped onto target fields. You can write query with direct mapping too: instead of placeholders use clover field's names 
 *  predated by dollar char [$]. In such form you can use mapping between generated keys and output record with this keys too.
 *  Complete query should appear as follows:<br><ul>
 *  <li><code>insert into mytable [(f1,f2,...,fn)] values (val1, $field2, ...,$fieldm ) returning $key := dbfield1, $field := dbfield2</code> - 
 *  where <i>f1,f2,...,fn,dbfield1,dbfield2</i> are database fields; <i>field2,.., fieldm</i> are input record fields 
 *  and <i>key, field</i> are key record fields. <b>This is valid for databases which can return more then 
 *  one columns in <i>getGeneratedKeys()</i> method (Oracle and db2 for the time beeing). </b> </li>
 *  <li><code>insert into mytable [(f1,f2,...,fn)] values (val1, $field2, ...,$fieldm ) returning $key := auto_generated, $field := infield</code> - 
 *  where <i>f1,f2,...,fn</i> are database fields; <i>field2,.., fieldm, infield</i> are input record fields, 
 *  <i>auto_generated</i>  is auto genereted column value returned by database and <i>key, field</i> are key record 
 *  fields. <b>This is valid for databases which returns one auto generated column in <i>getGeneratedKeys()</i> method 
 *  (MySql and Informix for the time being). </b> </li>
 *  <li><code>delete from mytable where f1 = $field1 and ... fn = $fieldn returning $updated:=update_count, $field:=infield</code> - where <i>f1,..,fn</i> are database
 *  fields, <i>field1,...,fieldn, infield</i> are input record fields, <i>updated, field</i> are output record's fields and
 *  <i>update_count</i> means number of updated records in database by current statement</li>
 *  <li><code>update mytable set f1 = $field1,...,fn=$fieldn where db1=$f1 returning $updated:=update_count, $field:=infield</code> - where <i>f1,..,fn, db1</i> are database
 *  fields, <i>field1,...,fieldn, infield</i> are input record fields, <i>updated, field</i> are output record's fields and
 *  <i>update_count</i> means number of updated records in database by current statement</li></ul>
 *  <tr><td><b>url</b><br><i>optional</i></td><td>url location of the query. The query will be loaded from file referenced by the url. 
 *  Rules for extern query are the same as for <i>sqlQuery</i> parameter.</td>
 *  <tr><td><b>charset </b><i>optional</i></td><td>encoding of extern query</td></tr>
 * <tr><td>&lt;SQLCode&gt;<br><i>optional<small>!!XML tag!!</small></i></td><td>This tag allows for embedding large SQL statement directly into graph.. See example below.</td></tr>
 *  <tr><td><b>fieldMap</b><br><i>optional</i></td><td>Pairs of clover fields and db fields (cloverField=dbField) separated by :;| {colon, semicolon, pipe}.<br>
 *  It specifies mapping from source (Clover's) fields to DB table fields if it isn't specified in <i>sqlQuery</i>. It should be used instead of <i>cloverFields</i> and <i>dbFields</i>
 *  attributes, because it provides more clear mapping. If <i>fieldMap</i> attribute is found <i>cloverFields</i> and <i>dbFields</i> attributes are ignored.
 *  <tr><td><b>dbFields</b><br><i>optional</i></td><td>delimited list of target table's fields to be populated<br>
 *  Input fields are mapped onto target fields (listed) in the order they are present in Clover's record.</td>
 *  <tr><td><b>commit</b><br><i>optional</i></td><td>determines how many records are in one db commit. Minimum 1, DEFAULT is 100.<br>If
 * MAX_INT is specified, it is considered as NEVER COMMIT - i.e. records are send to DB without every issuing commit. It can
 * be called later from within other component - for example DBExecute.</td>
 *  <tr><td><b>cloverFields</b><br><i>optional</i></td><td>delimited list of input record's fields.<br>Only listed fields (in the order
 *  they appear in the list) will be considered for mapping onto target table's fields. Combined with <b>dbFields</b> option you can
 *  specify mapping from source (Clover's) fields to DB table fields. If no <i>dbFields</i> are specified, then #of <i>cloverFields</i> must
 *  correspond to number of target DB table fields.</td>
 *   <tr><td><b>autoGeneratedColumns</b><br><i>deprecated</i> - use <i>sqlQuery</i> extended form</td><td>
 *   This attribute can be used for obtaining auto generated columns, but only in case that <i>sqlQuery</i> consist
 *   of only <b>one</b> query. In other case construct queries with direct mapping. 
 *   <ul><li>For Oracle or Db2 database: names of database columns to be returned (for Db2 - entity columns)</li>
 *   <li>For MySQL or Informix database: names of input record fields plus special field called "AUTO_GENERATED" to be returned</td></tr>
 *  <tr><td><b>batchMode</b><br><i>optional</i></td><td>[Yes/No] determines whether to use batch mode for sending statements to DB, DEFAULT is No.<br>
 *  <i>Note:If your database/JDBC driver supports this feature, switch it on as it significantly speeds up table population.</i></td>
 *  </tr>
 * <tr><td><b>batchSize</b><br><i>optional</i></td><td>number - determines how many records will be sent to database in one batch update. Default is 25.
 * </td>
 *  </tr> 
 *   <tr><td><b>maxErrors</b><br><i>optional</i></td><td>maximum number of allowed SQL errors. Default: 0 (zero). If exceeded, component stops with error. If set to <b>-1</b>(minus one) all errors are ignored.</td></tr>
 *   <tr><td><b>errorAction</b><br><i>optional</i></td><td>ROLLBACK or COMMIT (case sensitive!!!). Default: COMMIT. 
 *   Action performed when exceeded maximum number of records or execution of the component is aborted.</td></tr>
 *  </table>
 *
 *  <h4>Example:</h4>
 *  <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" dbTable="employee_z"/&gt;</pre>
 *  <br>
 *  <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" dbTable="employee_z" dbFields="f_name;l_name;phone"/&gt;</pre>
 *  <i>Example above shows how to populate only selected fields within target DB table. It can be used for skipping target fields which
 *  are automatically populated by DB (such as autoincremented fields).</i>
 *  <br>
 *  <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" dbTable="employee_z"
 *	   dbFields="f_name;l_name" cloverFields="LastName;FirstName"/&gt;</pre>
 *  <i>Example shows how to simply map Clover's LastName and FirstName fields onto f_name and l_name DB table fields. The order
 *  in which these fields appear in Clover data record is not important.</i>
 *  <br>
 *   <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" sqlQuery="insert into myemployee2 (FIRST_NAME,LAST_NAME,DATE,ID) values (?,?,sysdate,123)"
 *	   cloverFields="FirstName;LastName"/&gt;</pre>
 *  <br>
 * <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" cloverFields="FirstName;LastName"&gt;
 *  &lt;SQLCode&gt;
 *	insert into myemployee2 (FIRST_NAME,LAST_NAME,DATE,ID) values (?,?,sysdate,123)
 *  &lt;/SQLCode&gt;
 *  &lt;/Node&gt;</pre>
 *  <i>Example below shows how to delete records in table using DBOutputTable component</i>
 *  <pre>&lt;Node id="OUTPUT" type="DB_OUTPUT_TABLE" dbConnection="NorthwindDB" cloverFields="FirstName;LastName"&gt;
 *  &lt;SQLCode&gt;
 *  delete from myemployee2 where FIRST_NAME = ? and LAST_NAME = ?
 *  &lt;/SQLCode&gt;
 *  &lt;/Node&gt;</pre>
 * <br>
 *  <i>Example below shows usage of "fieldMap" attribute </i>
 * <pre>&lt;Node dbConnection="DBConnection0" dbTable="employee_tmp" fieldMap=
 * "EMP_NO=emp_no;FIRST_NAME=first_name;LAST_NAME=last_name;PHONE_EXT=phone_ext"
 * id="OUTPUT" type="DB_OUTPUT_TABLE"/&gt;</pre>
 * <br>
 * <i>Examples below show how to get aoutogenerated columns</i>
 * <pre>
 * &lt;Node dbConnection="Connection1" id="OUTPUT" maxErrors="10" sqlQuery=
 * "INSERT INTO CLOVER_USER (U_ID,NAME ,CREATED) values ($EMP_NO, $FULL_NAME,  $HIRE_DATE);
 *  INSERT INTO MYEMPLOYEE (EMP_NO, FIRST_NAME, LAST_NAME,COUNTRY, SALARY, FULL_NAME) VALUES 
 *                         ($EMP_NO, $FIRST_NAME, $LAST_NAME, $COUNTRY, $SALARY, $FULL_NAME) 
 *                         RETURNING $id:=auto_generated, $Field2:=full_name;" type="DB_OUTPUT_TABLE"/&gt;
 * 
 * &lt;Node dbConnection="Connection1" id="OUTPUT" maxErrors="10" sqlQuery=
 * "DELETE FROM CLOVER_USER WHERE U_ID = $EMP_NO;
 *  INSERT INTO MYEMPLOYEE (ID, EMP_NO, FIRST_NAME, LAST_NAME,COUNTRY, SALARY, FULL_NAME) VALUES 
 *                         (id_seq.nextval, $EMP_NO, $FIRST_NAME, $LAST_NAME, $COUNTRY, $SALARY, $FULL_NAME) 
 *                         RETURNING $id:=ID, $Field2:=FIRST_NAME;" type="DB_OUTPUT_TABLE" errorAction="ROLLBACK"/&gt;
 * 
 *&lt;Node autoGeneratedColumns="AUTO_GENERATED;FIRST_NAME;" dbConnection="DBConnection2" id="OUTPUT" maxErrors="10" 
 *	sqlQuery="INSERT INTO myemployee (EMP_NO, FIRST_NAME, LAST_NAME, PHONE_EXT, HIRE_DATE, DEPT_NO, JOB_CODE, JOB_GRADE, JOB_COUNTRY, SALARY, FULL_NAME) 
 *	VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" type="DB_OUTPUT_TABLE"/&gt;
 *
 *&lt;Node autoGeneratedColumns="ID;FIRST_NAME;" batchMode="false" dbConnection="DBConnection1" 
 *	dbTable="MYEMPLOYEE" id="OUTPUT" maxErrors="10" sqlQuery="INSERT INTO myemployee VALUES (id_seq.nextval, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" 
 *	type="DB_OUTPUT_TABLE"/&gt;
 *
 *<i>Example below show how to get number of updated records in database</i>
 * <pre>
 *&lt;Node batchMode="false" commit="10" dbConnection="Connection1" errorAction="ROLLBACK" id="OUTPUT" maxErrors="10" 
 *	sqlQuery="update customers  set prijmeni =$LAST_NAME  where klient_id_NO=$EMP_NO returning $id:=update_count, $Field2:=FULL_NAME;" 
 *	type="DB_OUTPUT_TABLE"/&gt;
 * </pre>
 * @author      dpavlis, avackova (agata.vackova@javlinconsulting.cz)
 * @since       September 27, 2002
 * @created     22. July 2003
 * @see         org.jetel.database.AnalyzeDB
 */
public class DBOutputTable extends Node implements MetadataProvider {
	
	private final static String OUT_METADATA_ID_SUFFIX = "_outMetadata";
	
	public static final String XML_MAXERRORS_ATRIBUTE = "maxErrors";
	public static final String XML_BATCHMODE_ATTRIBUTE = "batchMode";
	public static final String XML_COMMIT_ATTRIBUTE = "commit";
	public static final String XML_FIELDMAP_ATTRIBUTE = "fieldMap";
	public static final String XML_CLOVERFIELDS_ATTRIBUTE = "cloverFields";
	public static final String XML_DBFIELDS_ATTRIBUTE = "dbFields";
	public static final String XML_SQLCODE_ELEMENT = "SQLCode";
	public static final String XML_DBTABLE_ATTRIBUTE = "dbTable";
	public static final String XML_DBCONNECTION_ATTRIBUTE = "dbConnection";
	public static final String XML_SQLQUERY_ATRIBUTE = "sqlQuery";
	public static final String XML_BATCHSIZE_ATTRIBUTE = "batchSize";
	public static final String XML_URL_ATTRIBUTE = "url";
	public static final String XML_CHARSET_ATTRIBUTE = "charset";
	public static final String XML_AUTOGENERATEDCOLUMNS_ATTRIBUTE = "autoGeneratedColumns";
	public static final String XML_ACTION_ON_ERROR = "errorAction";
	public static final String XML_ATOMIC_RECORD_STATEMENT_ATTRIBUTE="atomicSQL";

	private DBConnection dbConnection;
	private SqlConnection connection;
	private String dbConnectionName;
	private String dbTableName;
	private SQLCloverStatement[] statement;
	private String[] cloverFields;
	private String[] dbFields;
	private String[] sqlQuery;
	private String queryURL;
	private String charset;
	private int recordsInCommit;
	private int maxErrors;
	private boolean useBatch;
	private int batchSize;
    private int countError=0;
	private String[] autoGeneratedColumns = null;
	private boolean[] returnResult;
	private ConnectionAction errorAction = ConnectionAction.COMMIT;
	private boolean atomicSQL;
	ReadableChannelIterator channelReadingIterator; // for reading the query from dictionary
	
	private InputPort inPort;
	private OutputPort rejectedPort, keysPort;
	private DataRecord inRecord, rejectedRecord, keysRecord;
	private int recCount = 0;
	private int errorCodeFieldNum;
	private int errMessFieldNum;
	private int failedBatches;
	private Savepoint savepoint;
	private static final String SAVEPOINT_NAME = "svpnt";
	
	public final static String COMPONENT_TYPE = "DB_OUTPUT_TABLE";
	private final static int READ_FROM_PORT = 0;
	private final static int WRITE_REJECTED_TO_PORT = 0;
	private final static int WRITE_AUTO_KEY_TO_PORT = 1;
	private final static int RECORDS_IN_COMMIT = 100;
	private final static int RECORDS_IN_BATCH = 25;
	private final static int MAX_ALLOWED_ERRORS = 0;
	private final static int MAX_WARNINGS = 3;
	
	static Log logger = LogFactory.getLog(DBOutputTable.class);

	/**
	 *  Constructor for the DBInputTable object
	 *
	 * @param  id                Unique ID of component
	 * @param  dbConnectionName  Name of Clover's database connection to be used for communicating with DB
	 * @param  dbTableName       Name of target DB table to be populated with data
	 * @since                    September 27, 2002
	 */
	public DBOutputTable(String id, String dbConnectionName, String dbTableName) {
		this(id,dbConnectionName);
		this.dbTableName = dbTableName;
	}

	/**
	 * @param id Unique ID of component
	 * @param dbConnectionName Name of Clover's database connection to be used for communicating with DB
	 * @param sqlQuery set of sql queries
	 */
	public DBOutputTable(String id, String dbConnectionName, String[] sqlQuery) {
		this(id,dbConnectionName);
		setSqlQuery(sqlQuery);
	}
	
	/**
	 * Constructor for the DBInputTable object
	 * @param id				Unique ID of component
	 * @param dbConnectionName	Name of Clover's database connection to be used for communicating with DB
	 * @param sqlQuery			SQL query to be executed against DB - can be any DML command (INSERT, UPDATE, DELETE)
	 * @param cloverFields		Array of Clover field names (the input data) which should substitute DML command parameters (i.e. "?")
	 */
	@Deprecated
	public DBOutputTable(String id, String dbConnectionName, String sqlQuery, String[] cloverFields) {
		this(id, dbConnectionName, new String[]{sqlQuery});
		setCloverFields(cloverFields);
	}
	
	/**
	 * Constructor for the DBInputTable object
	 */
	DBOutputTable(String id, String dbConnectionName){
		super(id);
		this.dbConnectionName = dbConnectionName;
		this.dbTableName = null;
		cloverFields = null;
		dbFields = null;
		recordsInCommit = RECORDS_IN_COMMIT;
		maxErrors=MAX_ALLOWED_ERRORS;
		useBatch=false;
		batchSize=RECORDS_IN_BATCH;
	}
	
	/**
	 *  Sets the dBFields attribute of the DBOutputTable object
	 *
	 * @param  dbFields  The new dBFields value
	 */
	public void setDBFields(String[] dbFields) {
		this.dbFields = dbFields;
	}


	/**
	 *  Sets the useBatch attribute of the DBOutputTable object
	 *
	 * @param  batchMode  The new useBatch value
	 */
	public void setUseBatch(boolean batchMode) {
		this.useBatch = batchMode;
	}

	/**
	 * Sets batch size - how many records are in batch which is sent
	 * to DB at once.
	 * @param batchSize
	 */
	public void setBatchSize(int batchSize){
	    this.batchSize=batchSize;
	}
	
	/**
	 * Sets atomicSQL attribute. If true all sql statements form one record are executed in one transaction and commit is performed after each record  
	 * @param atomicSQL
	 */
	public void setAtomicSQL(boolean atomicSQL) {
    this.atomicSQL = atomicSQL;
  }

	/**
	 *  Sets the cloverFields attribute of the DBOutputTable object
	 *
	 * @param  cloverFields  The new cloverFields value
	 */
	public void setCloverFields(String[] cloverFields) {
		this.cloverFields = cloverFields;
	}

	public String[] getSqlQuery() {
		return sqlQuery;
	}

	public void setSqlQuery(String[] sqlQuery) {
		// filter empty queries
		ArrayList<String> queries = new ArrayList<>();
		for(int i = 0; i < sqlQuery.length; i++) {
			if (sqlQuery[i] != null && sqlQuery[i].trim().length() > 0) {
				queries.add(sqlQuery[i]);
			}
		}
		this.sqlQuery=queries.toArray(new String[queries.size()]);
	}

	public void setSqlQuery(String sqlQuery) {
		this.sqlQuery = StringUtils.isEmpty(sqlQuery) ? null : new String[] { sqlQuery };
	}

	private void setCharset(String charset) {
		this.charset = charset;
	}

	public String getCharset() {
		return charset;
	}

	private void setQueryURL(String queryURL) {
		this.queryURL = queryURL;
	}

	/**
	 * Description of the Method
	 * 
	 * @exception ComponentNotReadyException
	 *                Description of Exception
	 * @since September 27, 2002
	 */
	@SuppressWarnings("deprecation")
	@Override
	public void init() throws ComponentNotReadyException {
		super.init();
		// get dbConnection from graph
		IConnection conn = getGraph().getConnection(dbConnectionName);
		if (conn == null) {
			throw new ComponentNotReadyException("Can't find DBConnection ID: " + dbConnectionName);
		}
		if (!(conn instanceof DBConnection)) {
			throw new ComponentNotReadyException("Connection with ID: " + dbConnectionName + " isn't instance of the DBConnection class.");
		}
		dbConnection = (DBConnection) conn;
		dbConnection.init();

		inPort = getInputPort(READ_FROM_PORT);

		rejectedPort = getOutputPort(WRITE_REJECTED_TO_PORT);
		rejectedRecord = rejectedPort != null ? DataRecordFactory.newRecord(rejectedPort.getMetadata()) : null;
		if (rejectedRecord != null) {
			errorCodeFieldNum = rejectedRecord.getMetadata().findAutoFilledField(AutoFilling.ERROR_CODE);
			errMessFieldNum = rejectedRecord.getMetadata().findAutoFilledField(AutoFilling.ERROR_MESSAGE);
			if (errMessFieldNum == -1) {
				DataRecordMetadata rejectedMetadata = rejectedPort.getMetadata();
				if (inPort.getMetadata().getNumFields() == rejectedMetadata.getNumFields() - 1 && rejectedMetadata.getField(rejectedMetadata.getNumFields() - 1).getType() == DataFieldMetadata.STRING_FIELD) {
					errMessFieldNum = rejectedMetadata.getNumFields() - 1;
				}
			}
		}

		// create insert query from db table name
		if (sqlQuery == null && queryURL == null) {
			sqlQuery = new String[1];
			
			// TODO Labels replace:
			if (dbFields != null) {
				sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(dbTableName, dbFields, dbConnection.getJdbcSpecific());
			} else {
				sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(
						inPort.getMetadata(), dbTableName, dbConnection.getJdbcSpecific());
			}
			// TODO Labels replace end

			// TODO Labels replace with:
//			// FIXME This also replaces escaped characters from dbTableName
//			// can lead to backslashes being consumed
//			String quotedTableName = StringUtils.stringToSpecChar(dbConnection.getJdbcSpecific().quoteIdentifier(dbTableName));
//			if (dbFields != null) {
//				String[] quotedDbFields = new String[dbFields.length];
//				for (int i = 0; i < dbFields.length; i++) {
//					quotedDbFields[i] = dbConnection.getJdbcSpecific().quoteIdentifier(dbFields[i]);
//				}
//				sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(quotedTableName, quotedDbFields);
//			} else {
//				sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(inPort.getMetadata(), quotedTableName);
//			}
			// TODO Labels replace with end
		}

		// The rest of initialization the connection is required, so it is done in first run of preExecute
		
	}

	@SuppressWarnings("deprecation")
	@Override
	public void preExecute() throws ComponentNotReadyException {
		super.preExecute();
		
		inRecord = DataRecordFactory.newRecord(inPort.getMetadata());
		
		// create connection instance, which represents connection to a database
		try {
			connection = dbConnection.getConnection(getId(), OperationType.WRITE);
		} catch (JetelException e1) {
			throw new ComponentNotReadyException(e1);
		}

		if (firstRun()) {// a phase-dependent part of initialization

			// get query from query url
			if (sqlQuery == null && queryURL != null) {
				if (queryURL.startsWith("dict:")) {
					channelReadingIterator = new ReadableChannelIterator(null, getGraph().getRuntimeContext().getContextURL(), queryURL);
					channelReadingIterator.setCharset(charset);
					channelReadingIterator.setDictionary(getGraph().getDictionary());
					channelReadingIterator.init();
					try {
						Object next = channelReadingIterator.next();
						DataRecordMetadata queryMetadata = new DataRecordMetadata("_query_metadata_", DataRecordParsingType.DELIMITED);
						DataFieldMetadata queryField = new DataFieldMetadata("_query_field_", DataFieldType.STRING, null);
						queryField.setEofAsDelimiter(true);
						queryField.setTrim(true);
						queryMetadata.addField(queryField);
						TextParser dictParser = TextParserFactory.getParser(queryMetadata, charset);
						dictParser.init();
						dictParser.setDataSource(next);
						DataRecord queryRecord = DataRecordFactory.newRecord(queryMetadata);
						if ((queryRecord = dictParser.getNext(queryRecord)) != null) {
							sqlQuery = new String[] { getPropertyRefResolver().resolveRef(queryRecord.getField(0).toString()) };
						}
					} catch (JetelException | IOException e) {
						throw new ComponentNotReadyException(e);
					}
				} else {
					String rawContents = FileUtils.getStringFromURL(getGraph().getRuntimeContext().getContextURL(), queryURL, charset);
					setSqlQuery(SQLUtil.split(getGraph().getPropertyRefResolver().resolveRef(rawContents, null)));
				}
			}
			
			keysPort = getOutputPort(WRITE_AUTO_KEY_TO_PORT);
			returnResult = new boolean[sqlQuery.length];
			Arrays.fill(returnResult, false);
			keysRecord = keysPort != null ? DataRecordFactory.newRecord(keysPort.getMetadata()) : null;

			// prepare set of statements
			statement = new SQLCloverStatement[sqlQuery.length];
			if (statement.length > 1 && autoGeneratedColumns != null) {
				logger.warn("Found more then one sql query and " + XML_AUTOGENERATEDCOLUMNS_ATTRIBUTE + " parameter. The last one will be ignored");
				autoGeneratedColumns = null;
			}

			// prepare rejectedRecord and keysRecord
			boolean supportsConnectionKeyGenaration = false;
			try {
				supportsConnectionKeyGenaration = dbConnection.getJdbcSpecific().supportsGetGeneratedKeys(connection.getMetaData());
			} catch (SQLException e1) {
				// TODO Auto-generated catch block
			}

			int end, start = 0;
			String[] tmpCloverFields = null;
			for (int i = 0; i < statement.length; i++) {
				if (cloverFields != null) {
					end = StringUtils.count(sqlQuery[i], '?');
					tmpCloverFields = new String[end - start];
					for (int j = 0; j < tmpCloverFields.length; j++) {
						if (start + j >= cloverFields.length) {
							throw new ComponentNotReadyException(this, XML_CLOVERFIELDS_ATTRIBUTE, "Missing parameter value for query " + StringUtils.quote(sqlQuery[i] + " , parameter number: " + (start + j + 1)));
						}
						tmpCloverFields[j] = cloverFields[start + j];
					}
					start = end;
				}
				statement[i] = new SQLCloverStatement(connection, sqlQuery[i], inRecord, tmpCloverFields, autoGeneratedColumns);
				if (statement[i].getQueryType() == QueryType.INSERT && statement[i].returnResult()) {
					if (useBatch) {
						logger.warn("Getting generated keys in batch mode is not supported -> switching it off !");
						sqlQuery[i] = sqlQuery[i].substring(0, SQLCloverStatement.indexOfReturning(sqlQuery[i], connection));
						statement[i] = new SQLCloverStatement(connection, sqlQuery[i], inRecord, cloverFields);
					} else if (!supportsConnectionKeyGenaration) {
						logger.warn("DB indicates no support for getting generated keys -> switching it off !");
						sqlQuery[i] = sqlQuery[i].substring(0, SQLCloverStatement.indexOfReturning(sqlQuery[i], connection));
						statement[i] = new SQLCloverStatement(connection, sqlQuery[i], inRecord, cloverFields);
					}
				}
			}

			// check that what we require is supported
			try {
				if (useBatch && !connection.getMetaData().supportsBatchUpdates()) {
					logger.warn("DB indicates no support for batch updates -> switching it off !");
					useBatch = false;
				}
			} catch (SQLException e) {
			}

			if (keysRecord != null) {
				for (int i = 0; i < returnResult.length; i++) {
					returnResult[i] = statement[i].returnResult();
				}
			}

			// when no one of queries returns something switch off key port
			if (keysRecord != null) {
				int i;
				for (i = 0; i < returnResult.length; i++) {
					if (returnResult[i])
						break;
				}
				if (i == returnResult.length) {
					keysRecord = null;
				}
			}

			// it is probably wise to have COMMIT size multiplication of BATCH size
			// except situation when commit size is MAX_INTEGER -> we never commit in this situation;
			if (useBatch && recordsInCommit != Integer.MAX_VALUE && (recordsInCommit % batchSize != 0)) {
				int multiply = recordsInCommit / batchSize;
				recordsInCommit = (multiply + 1) * batchSize;
			}

			// init statements
			SQLCloverStatement eachStatement;
			for (int i = 0; i < statement.length; i++) {
				eachStatement = statement[i];
				eachStatement.setLogger(logger);
				try {
					eachStatement.init();
				} catch (Exception e) {
					throw new ComponentNotReadyException(this, e);
				}
			}
		} else {
			if (rejectedRecord != null) {
				rejectedRecord.reset();
			}
			if (keysRecord != null) {
				keysRecord.reset();
			}
			for (SQLCloverStatement eachStatement : statement) {
				try {
					eachStatement.setConnection(connection);
					eachStatement.setInRecord(inRecord);
					eachStatement.reset();
				} catch (Exception e) {
					throw new ComponentNotReadyException(this, e);
				}
			}
			recCount = 0;
			countError = 0;
		}
	}
	
	@Override
	public void postExecute() throws ComponentNotReadyException {
		try {
			super.postExecute();
			if (recordsInCommit != Integer.MAX_VALUE) {
				// CLO-6100: do not close the connection, as we expect the graph to perform commit
				dbConnection.closeConnection(getId(), OperationType.WRITE);
			}
		} finally {
			ReadableChannelIterator.postExecute(channelReadingIterator);
		}
	}
	
	/**
	 * @param dbTableName The dbTableName to set.
	 */
	public void setDBTableName(String dbTableName) {
		this.dbTableName = dbTableName;
	}
	
	/**
	 *  Sets the recordsInCommit attribute of the DBOutputTable object
	 *
	 * @param  nRecs  The new recordsInCommit value
	 */
	public void setRecordsInCommit(int nRecs) {
		if (nRecs > 0) {
			recordsInCommit = nRecs;
		}
	}

	@Override
	public Result execute() throws Exception {
		/*
		 * Run main processing loop
		 */
		try{
			if (useBatch){
				runInBatchMode();
			}else{
				runInNormalMode();
			}
		}catch (InterruptedException e) {
			if (errorAction == ConnectionAction.ROLLBACK) {
				errorAction.perform(connection);
				if (errorAction == ConnectionAction.ROLLBACK) {
					logger.info("Rollback performed.");
					logger.info("Number of commited records: " + (recCount / recordsInCommit)*recordsInCommit);
				}else{
					logger.info("Number of commited records: " + recCount);
				}
				logger.info("Rollback performed.");
				logger.info("Number of commited records: " + (recCount / recordsInCommit)*recordsInCommit);
			}else if (recordsInCommit!=Integer.MAX_VALUE){
				errorAction.perform(connection);
				logger.info("Number of commited records: " + recCount);
			}
		} finally {
			broadcastEOF();
    		try {
    			for (SQLCloverStatement eachStatement : statement) {
    				eachStatement.close();
    			}
			} catch (SQLException exception) {
				logger.warn("SQLException when closing statement", exception);
			}
		}
        return runIt ? Result.FINISHED_OK : Result.ABORTED;
	}
	
	@Override
	public synchronized void free() {
		try {
			ReadableChannelIterator.free(channelReadingIterator);
		} finally {
			super.free();
//			if (dbConnection != null) {
//				dbConnection.free();
//			}
		}
	}

	private void runInNormalMode() throws SQLException,InterruptedException,IOException, JetelException{
		String errmes = "";
		SQLException exception = null;
		boolean useSavepoints = connection.getJdbcSpecific().useSavepoints();
		
		while ((inRecord = inPort.readRecord(inRecord)) != null && runIt) {
			if (keysRecord != null){
				keysRecord.reset();
			}
			//execute all statements
			for (int i=0; i<statement.length; i++) {
				try {
					// Fix of issue #5711; For PostgresSQL we need to set SAVEPOINT for partial rollback in case of SQL exception
					if (useSavepoints && !atomicSQL) {
						try {
							savepoint = connection.setSavepoint(SAVEPOINT_NAME);
						} catch (SQLException e) {
							logger.warn("Failed to set SAVEPOINT; rest of transaction may be lost", e);
						}
					}

					statement[i].executeUpdate(returnResult[i] ? keysRecord : null);
				} catch(SQLException ex) {
					countError++;
					exception = ex;
					errmes = "Exception thrown by: " + statement[i].getQuery() + ". Message: " + ExceptionUtils.getMessage(ex);
					SQLException chain = ex.getNextException();
					while (chain != null) {
						errmes += "\n  Caused by: " + ExceptionUtils.getMessage(chain);
						chain = chain.getNextException();
					}

					if (rejectedPort != null) {
						rejectedRecord.copyFieldsByName(inRecord);
						if (errMessFieldNum != -1) {
							rejectedRecord.getField(errMessFieldNum).setValue(errmes);
						}
						if (errorCodeFieldNum != -1){
							rejectedRecord.getField(errorCodeFieldNum).setValue(ex.getErrorCode());
						}
						rejectedPort.writeRecord(rejectedRecord);
					}
					if (countError <= MAX_WARNINGS) {
						logger.warn(errmes);
					}else if (countError == MAX_WARNINGS + 1){
						logger.warn("more errors...");
					}
					// if atomicity of all sql statements is required, rollback current transaction and cancel executing following statements of this record
					if (atomicSQL) {
						connection.rollback();
						logger.info("AtomicSQL is true. Rollback performed.");
						break;
					} else if (useSavepoints && savepoint != null) {
						// Fix of issue #5711; For PostgresSQL rollback to last SAVEPOINT (which was set after last successful statement)
						connection.rollback(savepoint);
					}
					
				}
			}
			//send filled key record to output
			if (keysRecord != null) {
				keysPort.writeRecord(keysRecord);
			}
			//if number of errors is greater then allowed throw exception
			if (countError>maxErrors && maxErrors!=-1){
				//Perform commit or rollback
				errorAction.perform(connection);
				if (errorAction == ConnectionAction.ROLLBACK) {
					logger.info("Rollback performed.");
					logger.info("Number of commited records: " + (recCount / recordsInCommit)*recordsInCommit);
//					logger.info("Last " + recCount % recordsInCommit  + " records not commited");
				}else if (errorAction == ConnectionAction.COMMIT){
					logger.info("Number of commited records: " + ++recCount);
				}
				throw new JetelException("Maximum # of errors exceeded when inserting record. "+ errmes, exception);
			}
			//if needed, commit
			if ((recordsInCommit!=Integer.MAX_VALUE && ++recCount % recordsInCommit == 0) || atomicSQL) {
				connection.commit();
			}
			SynchronizeUtils.cloverYield();
		}
 		// end of records stream - final commits;
		 // unless we have option never to commit, commit at the end of processing
	    if (runIt && recordsInCommit!=Integer.MAX_VALUE){
	    	connection.commit();	
	    }else if (!runIt) {//component execution aborted
			errorAction.perform(connection);
			if (errorAction == ConnectionAction.ROLLBACK) {
				logger.info("Rollback performed.");
				logger.info("Number of commited records: " + (recCount / recordsInCommit)*recordsInCommit);
//				logger.info("Last " + recCount % recordsInCommit + " records not commited");
			}else if (errorAction == ConnectionAction.COMMIT){
				logger.info("Number of commited records: " + ++recCount);
			}
	    }
	    
	}


	private void runInBatchMode() throws SQLException,InterruptedException,IOException, JetelException{
	    int batchCount=0;
    	int statementCount=0;
    	String errmes = "";
    	SQLException exception = null;
	    DataRecordMetadata rejectedMetadata = null;
	    if (rejectedPort != null) {
	    	rejectedMetadata = rejectedPort.getMetadata();
	    }
	    DataRecord[][] dataRecordHolder;
	    int holderCount= -1;
	    
        // first, we set transMap to batchUpdateMode
		for (SQLCloverStatement eachStatement : statement) {
			eachStatement.setBatchUpdate(true);
		}
		
	    // if we have rejected records port connected, we will
	    // store and report error records in batch
	    if (rejectedPort!=null){
	        dataRecordHolder=new DataRecord[statement.length][batchSize];
	        for (int i = 0; i < statement.length; i++) {
				for (int j = 0; j < batchSize; j++) {
					dataRecordHolder[i][j] = DataRecordFactory.newRecord(rejectedMetadata);
				}
			}
	    }else{
	        dataRecordHolder=null;
	    }
	    
	    while ((inRecord = inPort.readRecord(inRecord)) != null && runIt) {
			if (keysRecord != null){
				keysRecord.reset();
			}
			holderCount++;
			for (statementCount = 0; statementCount < statement.length; statementCount++) {
				try{
					statement[statementCount].addBatch(returnResult[statementCount] ? keysRecord : null);
					//prepare prospective rejected record
	                if (dataRecordHolder!=null) {
	                	dataRecordHolder[statementCount][holderCount].copyFieldsByName(inRecord);
						if (errMessFieldNum != -1) {
							dataRecordHolder[statementCount][holderCount].getField(errMessFieldNum).reset();
						}
						if (errorCodeFieldNum != -1){
							dataRecordHolder[statementCount][holderCount].getField(errorCodeFieldNum).reset();
						}
	                }
				}catch(SQLException ex){
	               countError++;
	               exception = ex;
	               errmes = "Exception thrown by: " + statement[statementCount].getQuery() + ". Message: " + ExceptionUtils.getMessage(ex);
	               //for this record statement won't be executed 
	               SQLException chain = ex.getNextException();
	               while(chain!=null) {
	                 errmes += "\n  Caused by: "+ExceptionUtils.getMessage(chain);
	                 chain = chain.getNextException();
	               }
					if (rejectedPort != null) {
						dataRecordHolder[statementCount][holderCount] = null;
						rejectedRecord.copyFieldsByName(inRecord);
						if (errMessFieldNum != -1) {
							rejectedRecord.getField(errMessFieldNum).setValue(errmes);
						}
						if (errorCodeFieldNum != -1){
							rejectedRecord.getField(errorCodeFieldNum).setValue(ex.getErrorCode());
						}
						rejectedPort.writeRecord(rejectedRecord);
					}
					if (countError <= MAX_WARNINGS) {
						logger.warn(errmes);
					}else if (countError == MAX_WARNINGS + 1){
						logger.warn("more errors...");
					}
					
				}
			}
			if (countError>maxErrors && maxErrors!=-1){
				logger.info("Number of commited records: " + recCount);
                throw new JetelException("Maximum # of errors exceeded when inserting record. "+ errmes, exception);
            }
			
            // shall we execute batch ?
			if ((++batchCount % batchSize == 0) || atomicSQL) {
				executeBatch(dataRecordHolder, holderCount+1);
				batchCount = 0;
				holderCount = -1;
			}
			if ((++recCount % recordsInCommit == 0) || atomicSQL) {
				if (batchCount != 0) {
					executeBatch(dataRecordHolder, holderCount+1);
					batchCount = 0;
					holderCount = -1;
				}
				connection.commit();
			}
		}
	    
		// final commit (if anything is left in batch)
	    if (batchCount > 0) {
	    	executeBatch(dataRecordHolder, holderCount);
	    }
	    
		// unless we have option never to commit, commit at the end of processing
		if (runIt && recordsInCommit != Integer.MAX_VALUE) {
			connection.commit();
			if (failedBatches > 0) {
				logger.warn("Number of failed batches: " + failedBatches);
			}
		} else if (!runIt) {
			errorAction.perform(connection);
			if (errorAction == ConnectionAction.ROLLBACK) {
				logger.info("Rollback performed.");
				logger.info("Number of commited records: " + (recCount / recordsInCommit) * recordsInCommit);
				logger.info("Number of failed batches: " + failedBatches);
				// logger.info("Last " + recCount % recordsInCommit + " records not commited");
			} else if (errorAction == ConnectionAction.COMMIT) {
				logger.info("Number of commited records: " + recCount);
				logger.info("Number of failed batches: " + failedBatches);
			}
		}
//	    if (dataRecordHolder!=null) {
//	        Arrays.fill(dataRecordHolder,null);
//        	Arrays.fill(exeptions, null);
//	    }
	}
	
    
	private void executeBatch(DataRecord[][] dataRecordHolder, int holderCount)
	throws SQLException, IOException, InterruptedException, JetelException {
		boolean exThrown = false;
		String errmes = "";
		DataRecord[] updatedRecord = null;
		BatchUpdateException exception = null;
		BatchUpdateException[] exceptions = new BatchUpdateException[statement.length];
		boolean useSavepoints = connection.getJdbcSpecific().useSavepoints();

		for (int statementCount = 0; statementCount < statement.length; statementCount++) {
			try {
				// Fix of issue #5711
				if (useSavepoints && !atomicSQL) {
					try {
						savepoint = connection.setSavepoint(SAVEPOINT_NAME);
					} catch (SQLException e) {
						logger.warn("Failed to set SAVEPOINT; rest of transaction may be lost", e);
					}
				}

				statement[statementCount].executeBatch();
				
				updatedRecord = statement[statementCount].getBatchResult();
				statement[statementCount].clearBatch();
			} catch (BatchUpdateException ex) {
				updatedRecord = statement[statementCount].getBatchResult();
				statement[statementCount].clearBatch();
				exceptions[statementCount] = ex;
				exception = ex;
				errmes += "Exception thrown by: " + statement[statementCount].getQuery() + ". Message: " + ExceptionUtils.getMessage(ex) + "\n";
				if (ex.getNextException() != null) {
					// With PostgreSQL, 1. exception is good for nothing, append next one
					errmes += "  Caused by: " + ExceptionUtils.getMessage(ex.getNextException());
				}
				exThrown = true;
				
				if (useSavepoints && savepoint != null) {
					connection.rollback(savepoint);
				}
			}
			for (int i = 0; i < updatedRecord.length; i++) {
				keysPort.writeRecord(updatedRecord[i]);
			}
		}
		// all statements executed, some of them could fail
		if (exThrown) {
			failedBatches++;
			countError++;
			if (countError <= MAX_WARNINGS) {
				logger.warn(errmes);
			} else if (countError == MAX_WARNINGS + 1) {
				logger.warn("more errors...");
			}

			flushErrorRecords(dataRecordHolder, holderCount, exceptions, rejectedPort);
			if (atomicSQL) {
				connection.rollback();
				logger.info("Atomic SQL is true. Rollback performed.");
			}
			if (countError > maxErrors && maxErrors != -1) {
				errorAction.perform(connection);
				if (errorAction == ConnectionAction.ROLLBACK) {
					logger.info("Rollback performed.");
					logger.info("Number of commited records: " + (recCount / recordsInCommit) * recordsInCommit);
					logger.info("Number of failed batches: " + failedBatches);
					// logger.info("Last " + batchCount + " records not commited");
				} else if (errorAction == ConnectionAction.COMMIT) {
					logger.info("Number of commited records: " + ++recCount);
					logger.info("Number of failed batches: " + failedBatches);
				}
				throw new JetelException("Maximum # of errors exceeded when executing batch. " + errmes, exception);
			}
		}

	}
	
    /**
     * This method sends error records to output and counts errors. If array <i>records</i> is null, only counting of errors is performed  
     * 
     * @param records potential rejected records
     * @param recCount number of records in batch
     * @param ex thrown exception
     * @param rejectedPort rejected Port
     * @throws IOException
     * @throws InterruptedException
     */
    private void flushErrorRecords(DataRecord[][] records,int recCount, BatchUpdateException[] ex, OutputPort rejectedPort) 
    throws IOException,InterruptedException {

//        if (records==null) return;
        
        int[] updateCounts;
        int count;
        SQLException exception;
        StringBuilder message = new StringBuilder();

        //for each statement exception has occurred
        for (int i=0; i < ex.length; i++) {
			if (ex[i] != null) {
				exception = ex[i];
				updateCounts = ex[i].getUpdateCounts();
				count = 0;
		        while(count<updateCounts.length){
					if (updateCounts[count] == Statement.EXECUTE_FAILED) {
						//increase error counter, fill rejected record and log error message
						countError++;
						if (records != null && records[i][count] != null) {
							if (exception != null) {
								if (errMessFieldNum != -1) {
									records[i][count].getField(errMessFieldNum).setValue("Exception thrown by: " + 
											statement[i].getQuery() + ". Message: " + ExceptionUtils.getMessage(exception));
								}
								if (errorCodeFieldNum != -1){
									records[i][count].getField(errorCodeFieldNum).setValue(exception.getErrorCode());
								}
							}
							if (exception != null && countError <= MAX_WARNINGS) {
								logger.warn("Exception thrown by: " + statement[i].getQuery() + 
										". Message: " + ExceptionUtils.getMessage(exception));
							} else if (exception == null && countError <= MAX_WARNINGS) {
								logger.warn("Record not inserted to database");
							} else if (countError == MAX_WARNINGS + 1) {
								logger.warn("more errors...");
							}
							rejectedPort.writeRecord(records[i][count]);
							if (exception != null) {
								exception = exception.getNextException();
							}
						}else if (records != null){//records[i][count] == null - it wasn't added to batch, prepare for next batch
							records[i][count] = DataRecordFactory.newRecord(rejectedPort.getMetadata());
						}
					}
					count++;
		        }
		        // flush rest of the records for which we don't have update counts
		        message.setLength(0);
		        Integer errCode = exception != null ? exception.getErrorCode() : null;
		    	while (exception != null) {
		    		message.append(exception.getMessage());
		    		exception = exception.getNextException();
		    	}
		        while(count<recCount){
		    		if (records != null && records[i][count] != null) {
						if (message.length() > 0 && countError <= MAX_WARNINGS) {
							logger.warn(message);
						} else if (message.length() > 0
								&& countError == MAX_WARNINGS + 1) {
							logger.warn("more errors...");
						}
						if (message.length() > 0 && i < recCount) {
							if (errMessFieldNum != -1) {
								records[i][count].getField(errMessFieldNum).setValue(message);
							}
							if (errorCodeFieldNum != -1){
								records[i][count].getField(errorCodeFieldNum).setValue(errCode);
							}
						}
						message = new StringBuilder("Record not inserted to database");
						countError++;
						rejectedPort.writeRecord(records[i][count]);
					}else if (records != null){
						records[i][count] = DataRecordFactory.newRecord(rejectedPort.getMetadata());
					}
					count++;
		        }
			}
		}

        //clear errors
        Arrays.fill(ex, null);
    }
	
	/**
	 *  Description of the Method
	 *
	 * @param  nodeXML  Description of Parameter
	 * @return          Description of the Returned Value
	 * @throws AttributeNotFoundException 
	 * @since           September 27, 2002
	 */
     @SuppressWarnings("deprecation")
	public static Node fromXML(TransformationGraph graph, Element xmlElement) throws XMLConfigurationException, AttributeNotFoundException {
		ComponentXMLAttributes xattribs = new ComponentXMLAttributes(xmlElement, graph);
		ComponentXMLAttributes xattribsChild;
		org.w3c.dom.Node childNode;
		DBOutputTable outputTable;

		// allows specifying parameterized SQL (with ? - question marks)
		if (xattribs.exists(XML_URL_ATTRIBUTE)){
			outputTable = new DBOutputTable(xattribs.getString(XML_ID_ATTRIBUTE), xattribs.getString(XML_DBCONNECTION_ATTRIBUTE, null));
			outputTable.setQueryURL(xattribs.getStringEx(XML_URL_ATTRIBUTE, RefResFlag.URL));
		}else if (xattribs.exists(XML_SQLQUERY_ATRIBUTE)) {
				outputTable = new DBOutputTable(xattribs.getString(XML_ID_ATTRIBUTE),
				xattribs.getString(XML_DBCONNECTION_ATTRIBUTE, null),
				SQLUtil.split(xattribs.getString(XML_SQLQUERY_ATRIBUTE)));
		}else if(xattribs.exists(XML_DBTABLE_ATTRIBUTE)){
			outputTable = new DBOutputTable(xattribs.getString(XML_ID_ATTRIBUTE),
					xattribs.getString(XML_DBCONNECTION_ATTRIBUTE, null),
					xattribs.getString(XML_DBTABLE_ATTRIBUTE));
		}else if ((childNode = xattribs.getChildNode(xmlElement, XML_SQLCODE_ELEMENT)) != null) {
            xattribsChild = new ComponentXMLAttributes((Element)childNode, graph);
            outputTable = new DBOutputTable(xattribs.getString(XML_ID_ATTRIBUTE),
					xattribs.getString(XML_DBCONNECTION_ATTRIBUTE, null),
					SQLUtil.split(xattribsChild.getText(childNode)));
		} else {
			outputTable = new DBOutputTable(xattribs.getString(XML_ID_ATTRIBUTE), xattribs.getString(XML_DBCONNECTION_ATTRIBUTE, null));
		}
		
		if (xattribs.exists(XML_DBTABLE_ATTRIBUTE)) {
			outputTable.setDBTableName(xattribs.getString(XML_DBTABLE_ATTRIBUTE));
		}
		if (xattribs.exists(XML_FIELDMAP_ATTRIBUTE)){
			String[] pairs = StringUtils.split(xattribs.getStringEx(XML_FIELDMAP_ATTRIBUTE, RefResFlag.SPEC_CHARACTERS_OFF));
			String[] cloverFields = new String[pairs.length];
			String[] dbFields = new String[pairs.length];
			String[] mapping;
			for (int i=0;i<pairs.length;i++){
				mapping = JoinKeyUtils.getMappingItemsFromMappingString(pairs[i]);//:= or =
				cloverFields[i] = mapping[0];
				dbFields[i] = mapping[1];
			}
			outputTable.setCloverFields(cloverFields);
			outputTable.setDBFields(dbFields);
		}else {
			if (xattribs.exists(XML_DBFIELDS_ATTRIBUTE)) {
				outputTable.setDBFields(xattribs.getString(XML_DBFIELDS_ATTRIBUTE).split(Defaults.Component.KEY_FIELDS_DELIMITER_REGEX));
			}

			if (xattribs.exists(XML_CLOVERFIELDS_ATTRIBUTE)) {
				outputTable.setCloverFields(xattribs.getString(XML_CLOVERFIELDS_ATTRIBUTE).split(Defaults.Component.KEY_FIELDS_DELIMITER_REGEX));
			}
		}
		if (xattribs.exists(XML_COMMIT_ATTRIBUTE)) {
			outputTable.setRecordsInCommit(xattribs.getInteger(XML_COMMIT_ATTRIBUTE));
		}
		
		if (xattribs.exists(XML_BATCHMODE_ATTRIBUTE)) {
			outputTable.setUseBatch(xattribs.getBoolean(XML_BATCHMODE_ATTRIBUTE));
		}
		if (xattribs.exists(XML_BATCHSIZE_ATTRIBUTE)) {
			outputTable.setBatchSize(xattribs.getInteger(XML_BATCHSIZE_ATTRIBUTE));
		}
		if (xattribs.exists(XML_MAXERRORS_ATRIBUTE)){
			outputTable.setMaxErrors(xattribs.getInteger(XML_MAXERRORS_ATRIBUTE));
		}
		if (xattribs.exists(XML_AUTOGENERATEDCOLUMNS_ATTRIBUTE)){
			outputTable.setAutoGeneratedColumns(xattribs.getString(XML_AUTOGENERATEDCOLUMNS_ATTRIBUTE).split(Defaults.Component.KEY_FIELDS_DELIMITER_REGEX));
		}
		if (xattribs.exists(XML_ACTION_ON_ERROR)){
			outputTable.setErrorAction(xattribs.getString(XML_ACTION_ON_ERROR));
		}
		if (xattribs.exists(XML_ATOMIC_RECORD_STATEMENT_ATTRIBUTE)){
			outputTable.setAtomicSQL(xattribs.getBoolean(XML_ATOMIC_RECORD_STATEMENT_ATTRIBUTE));
		}
		if (xattribs.exists(XML_CHARSET_ATTRIBUTE)){
			outputTable.setCharset(xattribs.getString(XML_CHARSET_ATTRIBUTE));
		}
		
		return outputTable;
	}

	@Override
     public ConfigurationStatus checkConfig(ConfigurationStatus status) {
         super.checkConfig(status);
         
         if(!checkInputPorts(status, 1, 1)
        		 || !checkOutputPorts(status, 0, 2, false)) {
        	 return status;
         }
         
         if (sqlQuery == null && queryURL == null && dbTableName == null) {
         	status.addError(this, null,
         			"One of " + XML_SQLQUERY_ATRIBUTE + ", " + XML_URL_ATTRIBUTE + " or " + XML_DBTABLE_ATTRIBUTE + " must be specified.");
         }
         if (dbConnectionName == null) {
         	status.addError(this, XML_DBCONNECTION_ATTRIBUTE, "DB connection not defined.");
         }
         if ((sqlQuery == null && queryURL == null && dbTableName == null ) || dbConnectionName == null) {
        	 return status;
         }

         try {
//             init();
        	 
			IConnection conn = getGraph().getConnection(dbConnectionName);
			if (conn == null) {
				throw new ComponentNotReadyException(
						"Can't find DBConnection ID: " + dbConnectionName, XML_DBCONNECTION_ATTRIBUTE);
			}
			if (!(conn instanceof DBConnection)) {
				throw new ComponentNotReadyException("Connection with ID: "
						+ dbConnectionName
						+ " isn't instance of the DBConnection class.", XML_DBCONNECTION_ATTRIBUTE);
			}
			dbConnection = (DBConnection) conn;
			dbConnection.init();

			// create connection instance, which represents connection to a
			// database
			try {
				connection = dbConnection.getConnection(getId(),
						OperationType.WRITE);
			} catch (JetelException e1) {
				throw new ComponentNotReadyException(e1);
			}

			inPort = getInputPort(READ_FROM_PORT);
			connection.getJdbcSpecific().checkMetadata(status, getInMetadata(), this);
			if (sqlQuery == null && queryURL == null) {
				sqlQuery = new String[1];
				// TODO Labels replace:
				if (dbFields != null) {
					sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(
							dbTableName, dbFields, dbConnection.getJdbcSpecific());
				} else {
					sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(inPort
							.getMetadata(), dbTableName, dbConnection.getJdbcSpecific());
				}
				// TODO Labels replace end
				
				// TODO Labels replace with:
//				// FIXME This also replaces escaped characters from dbTableName
//				// can lead to backslashes being consumed
//				String quotedTableName = StringUtils.stringToSpecChar(dbConnection.getJdbcSpecific().quoteIdentifier(dbTableName));
//				if (dbFields != null) {
//					String[] quotedDbFields = new String[dbFields.length];
//					for (int i = 0; i < dbFields.length; i++) {
//						quotedDbFields[i] = dbConnection.getJdbcSpecific().quoteIdentifier(dbFields[i]);
//					}
//					sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(
//							quotedTableName, quotedDbFields);
//				} else {
//					sqlQuery[0] = SQLUtil.assembleInsertSQLStatement(inPort
//							.getMetadata(), quotedTableName);
//				}
				// TODO Labels replace with end
			}
			if (sqlQuery == null) {
				// no more checking, this branch is reached when query is specified using Query URL
				return status;
			}
			boolean supportsConnectionKeyGenaration = false;
			try {
				supportsConnectionKeyGenaration = connection.getJdbcSpecific()
						.supportsGetGeneratedKeys(connection.getMetaData());
			} catch (SQLException e1) {
				// TODO Auto-generated catch block
			}
			if (inPort.getMetadata() != null) {
				inRecord = DataRecordFactory.newRecord(inPort.getMetadata());
				int start = 0, end;
				for (int i = 0; i < sqlQuery.length; i++) {
					String[] tmpCloverFields = null;
					if (cloverFields != null) {
						end = StringUtils.count(sqlQuery[i], '?');
						tmpCloverFields = new String[end - start];
						for (int j = 0; j < tmpCloverFields.length; j++) {
							if (start + j >= cloverFields.length) {
								throw new ComponentNotReadyException(
										this,
										XML_CLOVERFIELDS_ATTRIBUTE,
										"Missing parameter value for query "
												+ StringUtils
														.quote(sqlQuery[i]
																+ " , parameter number: "
																+ (start + j + 1)));
							}
							tmpCloverFields[j] = cloverFields[start + j];
						}
						start = end;
					}
					SQLCloverStatement statement = new SQLCloverStatement(
							connection, sqlQuery[i], inRecord, tmpCloverFields,
							autoGeneratedColumns);
					if (statement.getQueryType() == QueryType.INSERT && statement.returnResult()) {
						if (useBatch) {
							logger.warn("Getting generated keys in batch mode is not supported -> switching it off !");
							sqlQuery[i] = sqlQuery[i].substring(0, SQLCloverStatement.indexOfReturning(sqlQuery[i], connection));
							statement = new SQLCloverStatement(connection, sqlQuery[i], inRecord, cloverFields);
						} else if (!supportsConnectionKeyGenaration) {
							logger.warn("DB indicates no support for getting generated keys -> switching it off !");
							sqlQuery[i] = sqlQuery[i].substring(0, SQLCloverStatement.indexOfReturning(sqlQuery[i], connection));
							statement = new SQLCloverStatement(connection, sqlQuery[i], inRecord, cloverFields);
						}
					}
					try {
						statement.init();
					} catch (SQLException e) {
						throw new ComponentAlmostNotReadyException(this, e);
					}
					try {
						try {
							keysPort = getOutputPort(WRITE_AUTO_KEY_TO_PORT);
							statement.checkConfig(status, keysPort == null ? null
									: keysPort.getMetadata(), this);
                        } finally {
                            // make sure we do not leak statements
                            statement.close();
                        }
					} catch (SQLException e) {
						status.addWarning(this, null, e);
					}                        
				}
			}

		}  catch (UnsupportedOperationException uoe) {
    		//it isn't possible to perform check config (for example some method of db driver throws the exception)
    		//this means we don't know whether the configuration is valid or not
    		status.addWarning(this, null,
    				"Cannot check the configuration of component. Used driver does not implement some required methods.", uoe);
    	} catch (ComponentAlmostNotReadyException e1) {
			status.addWarning(this, null, e1);
		} catch (ComponentNotReadyException e) {
			status.addError(this, null, e);
		} finally {
			if (dbConnection != null) {
				dbConnection.free();
			}
		}

		return status;
    }

	/**
	 * @param maxErrors Maximum number of tolerated SQL errors during component run. Default: 0 (zero)
	 */
	public void setMaxErrors(int maxErrors) {
		this.maxErrors = maxErrors;
	}

	/**
	 * @param autoGeneratedColumns names of db columns to get back from database
	 */
	public void setAutoGeneratedColumns(String[] autoGeneratedColumns) {
		this.autoGeneratedColumns = autoGeneratedColumns;
	}
	
	public String[] getAutoGeneratedColumns() {
		return autoGeneratedColumns;
	}

	public void setErrorAction(ConnectionAction errorAction) {
		this.errorAction = errorAction;
	}
	
	public void setErrorAction(String errorAction) {
		this.errorAction = ConnectionAction.valueOf(errorAction);
	}

	public void setDBConnection(String dbConnection) {
		this.dbConnectionName = dbConnection;
	}
	
	static class ComponentAlmostNotReadyException extends ComponentNotReadyException {

		private static final long serialVersionUID = 1407613894034913595L;

		public ComponentAlmostNotReadyException(IGraphElement element,
				Exception ex) {
			super(element, ex);
		}
		
	}

	@Override
	public MVMetadata getInputMetadata(int portIndex, MetadataPropagationResolver metadataPropagationResolver) {
		return null;
	}

	@Override
	public MVMetadata getOutputMetadata(int portIndex, MetadataPropagationResolver metadataPropagationResolver) {
		if(portIndex == 0){
			InputPort inputPort = getInputPort(0);
			if(inputPort != null){
				MVMetadata inputMetadata = metadataPropagationResolver.findMetadata(inputPort.getEdge());
				if (inputMetadata != null) {
					DataRecordMetadata recordMetadata = inputMetadata.getModel().duplicate();
					
					DataFieldMetadata errCodeField = new DataFieldMetadata("ErrCode", DataFieldType.INTEGER, inputMetadata.getModel().getFieldDelimiter());
					DataFieldMetadata errTextField = new DataFieldMetadata("ErrText", inputMetadata.getModel().getRecordDelimiter());
					errCodeField.setAutoFilling(AutoFilling.ERROR_CODE);
					errTextField.setAutoFilling(AutoFilling.ERROR_MESSAGE);
					
					recordMetadata.addField(errCodeField);
					recordMetadata.addField(errTextField);
					
					recordMetadata.getRecordProperties().remove(new String("previewAttachment"));
					MVMetadata metadata = metadataPropagationResolver.createMVMetadata(recordMetadata, this, OUT_METADATA_ID_SUFFIX);
					return metadata;
				}
			}
		}
		return null;
	}
	
	
}
	
